#include "config.h"

#include <time.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <sys/poll.h>
#include <sys/epoll.h>
#include <sys/file.h>
#include <sys/mman.h>
#include <linux/fs.h>
#include <unistd.h>
#include <string.h>
#include <stdio.h>
#include <stdint.h>
#include <libgen.h>
#include <ctype.h>
#include <fcntl.h>
#include <libaio.h>
#include <limits.h>
#include <errno.h>
#include <sys/vfs.h>

#define DBG_SUBSYS S_LIBREPLICA

#include "lich_api.h"

#include "cache.h"
#include "disk.h"
#include "sysy_lib.h"
#include "core.h"
#include "ynet_rpc.h"
#include "disk_redis.h"
#include "job_dock.h"
#include "net_global.h"
#include "diskmd.h"
#include "bmap.h"
#include "fnotify.h"
#include "tpool.h"
#include "configure.h"
#include "nodectl.h"
#include "lich_aio.h"
#include "node.h"
#include "disk_maping.h"
#include "recovery.h"
#include "disk_slot.h"
#include "diskmd_pool.h"
#include "../../task/diskmd_recovery.h"
#include "disk_sqlite3.h"

#include "nvme.h"
/**
 * @file Local Disk Management
 *
 * 创建存储池：
 *
 * 删除存储池：
 *
 * 添加磁盘：
 *     写入磁盘头，并创建disk/%.disk符号链接，符号链接指向设备文件。磁盘头部包括diskid，nodename，poolname等信息。
 *
 * 磁盘的数据布局和用法:
 *     磁盘头的前1M，按ext4 super block布局，私有信息存入reserved字段，可以避开一些测试工具的识别。
 *
 *     通过disk bitmap管理磁盘空间，通过sqlite管理chkid到磁盘空间的映射。
 *
 * 删除磁盘：
 *     删除disk/%.disk符号链接，通过fnotify监听该目录，可以捕获该事件。
 *     - 何时关闭fd，同时要避免reuse fd造成的问题？
 *     - 如何减少check数据过程对性能造成的干扰？
 *     - RAID，拔盘会影响到其它盘的性能
 *
 * 删除缓存盘：
 *
 * 删除磁盘的数据恢复：
 *
 */

#define DISK_RECOVERY_FINISH 1
#define DISK_RECOVERY_SCAN_COUNT 5

typedef struct {
        //protect pool add/rm
        sy_rwlock_t lock;
        char home[MAX_PATH_LEN];
        pool_t *pool[POOL_MAX];
        int (*lost_callback)(int diskid, int cache);
} disk_manager_t;

disk_manager_t *__disk_manager__ = NULL;

typedef struct {
        int idx;
        int cache;
        disk_manager_t *manager;
} lost_t;

STATIC int diskmd_pool_iterator(func_int1_t func_cb, void *arg);
STATIC int diskmd_disk_iterator(func_int1_t func_cb, void *arg);
STATIC int diskmd_disk_iterator_by_pool(const char *pool_name, func_int2_t func_cb, void *arg);

STATIC int diskmd_count_disk(int *disk_curr);

STATIC int diskmd_find_newdisk(disk_manager_t *manager);
STATIC int diskmd_add_newdisk(disk_manager_t *manager, int idx);
STATIC int diskmd_find_removedisk(disk_manager_t *manager);
STATIC int diskmd_remove_disk(disk_manager_t *manager, int idx);

STATIC void diskmd_update_tier();

#if 0
STATIC int diskmd_writeback_dispatch(const char *pool, int *diskid);
STATIC int diskmd_writeback_del(int diskid);
#endif

STATIC int diskmd_node_dfree1(uint64_t *_disk_total, uint64_t *_disk_free);

STATIC int diskmd_init(const char *home, uint64_t *max_chunk);
STATIC int diskmd_prep_env(const char *home);
STATIC int diskmd_load(disk_manager_t *manager);
STATIC int diskmd_disk_load(disk_manager_t *manager);

STATIC int diskmd_pool_ensure(disk_manager_t *manager, const char *pool_name);
STATIC int diskmd_pool_get_nolock(disk_manager_t *manager, const char *name, pool_t **pool);
STATIC int diskmd_pool_add_nolock(disk_manager_t *manager, pool_t *pool);
STATIC int diskmd_pool_rm_nolock(disk_manager_t *manager, const char *name, pool_t **pool);

STATIC int diskmd_add_disk(disk_manager_t *manager, const char *pool_name, disk_t *disk);
STATIC void diskmd_set_diskoffline(disk_t *disk);

STATIC int diskmd_set_disklost(disk_t *disk, const char *why);

/* inotify */
STATIC int diskmd_disklink_add(void *context, uint32_t mask);
STATIC int diskmd_disklink_rm(void *context, uint32_t mask);

STATIC int diskmd_infolink_add(void *context, uint32_t mask);
STATIC int diskmd_infolink_rm(void *context, uint32_t mask);

STATIC int diskmd_disk_write_check(int diskid);
STATIC void diskmd_remove(int idx);

int diskmd_async_push_disk_task(disk_t *disk, const char *reason);

/*
 *
 * func
 *
 */
STATIC int diskmd_pool_iterator(func_int1_t func_cb, void *arg)
{
        int ret, i;
        pool_t *pool;
        disk_manager_t *manager = __disk_manager__;

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < POOL_MAX; i++) {
                pool = manager->pool[i];

                if (unlikely(pool)) {
                        ret = sy_rwlock_rdlock(&pool->lock);
                        if (unlikely(ret))
                                GOTO(err_unlock ,ret);

                        ret = func_cb(pool, arg);
                        if (unlikely(ret))
                                GOTO(err_unlock1, ret);

                        sy_rwlock_unlock(&pool->lock);
                }
        }

        sy_rwlock_unlock(&manager->lock);

        return 0;
err_unlock1:
        sy_rwlock_unlock(&pool->lock);
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

STATIC int diskmd_disk_iterator(func_int1_t func_cb, void *arg)
{
        int ret;

        ret = disk_slot_iterator(func_cb, arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_disk_iterator_by_pool(const char *pool_name, func_int2_t func_cb, void *arg)
{
        int ret, i, j;
        pool_t *pool;
        disk_t *disk;
        disk_manager_t *manager = __disk_manager__;

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < POOL_MAX; i++) {
                pool = manager->pool[i];
                if (unlikely(pool)) {
                        ret = sy_rwlock_rdlock(&pool->lock);
                        if (unlikely(ret))
                                GOTO(err_unlock, ret);

                        if (!strcmp(pool_name, pool->name)) {

                                for (j = 0; j < DISK_MAX; j++) {
                                        if (unlikely(pool->disk_array[j])) {
                                                ret = disk_slot_get(j, &disk);
                                                if (unlikely(ret)) {
                                                        YASSERT(0 && "why ?");
                                                        GOTO(err_unlock1, ret);
                                                }

                                                ret = func_cb(pool, disk, arg);
                                                if (unlikely(ret))
                                                        GOTO(err_release, ret);

                                                disk_slot_release(j);
                                        }
                                }
                        }

                        sy_rwlock_unlock(&pool->lock);
                }

        }

        sy_rwlock_unlock(&manager->lock);

        return 0;
err_release:
        disk_slot_release(j);
err_unlock1:
        sy_rwlock_unlock(&pool->lock);
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

static int diskmd_count_disk_cb(void *_disk, void *_array)
{
        disk_t *disk = _disk;
        int *disk_curr = _array;

        YASSERT(disk_curr[disk->idx] == 0);
        disk_curr[disk->idx] = 1;
        return 0;
}

STATIC int diskmd_count_disk(int *disk_curr)
{
        int ret;
        ret = diskmd_disk_iterator(diskmd_count_disk_cb, disk_curr);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_add_newdisk(disk_manager_t *manager, int idx)
{
        int ret, empty, bad;
        disk_t *disk;
        char pool_name[MAX_NAME_LEN];

        ret = disk_slot_empty(idx,  &empty);
        if (unlikely(ret)) {
                GOTO(err_ret, ret);
        }

        if (!empty) {
                YASSERT(0 && "why disk exist?");
        }

        ret = disk_create(idx, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk_load(disk, manager->home, TRUE, pool_name, &bad);
        if (unlikely(ret || bad))
                GOTO(err_free, ret);

        DINFO("got new disk %u size %ju\n", idx, disk->size);

        ret = diskmd_add_disk(manager, pool_name, disk);
        if (unlikely(ret))
                GOTO(err_free, ret);

        return 0;
err_free:
       yfree((void **)&disk);
err_ret:
        return ret;
}

STATIC int diskmd_remove_setlost(disk_manager_t *manager, int idx)
{
        int ret;
        disk_t *disk;

        (void) manager;

        ret = disk_slot_get(idx, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        diskmd_async_push_disk_task(disk, "remove");

        disk_slot_release(idx);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_find_newdisk(disk_manager_t *manager)
{
        int ret, i;
        int disk_curr[DISK_MAX] = {0};
        char path[MAX_PATH_LEN];

        ret = diskmd_count_disk(&disk_curr[0]);
        if (unlikely(ret))
                GOTO(err_ret ,ret);

        for (i = 0; i < DISK_MAX; i++) {
                snprintf(path, MAX_PATH_LEN, "%s/disk/%d.disk", manager->home, i);
                ret = path_access(path);
                if (unlikely(ret)) {
                        continue;
                }

                if (!disk_curr[i]) {
                        DINFO("disksm add new disk %d\n", i);

                        ret = diskmd_add_newdisk(manager, i);
                        if (unlikely(ret))
                                GOTO(err_ret ,ret);
                } else {
                        int changed = 0;
                        ret = diskmd_set_online(i, &changed);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);

                        if (changed) {
                                ret = diskmd_recovery_stop_disk(i);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        }
                }
        }

        diskmd_update_tier();

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_find_removedisk(disk_manager_t *manager)
{
        int ret, i;
        int disk_curr[DISK_MAX] = {0};
        char path[MAX_PATH_LEN];

        ret = diskmd_count_disk(&disk_curr[0]);
        if (unlikely(ret))
                GOTO(err_ret ,ret);

        for (i = 0; i < DISK_MAX; i++) {
                snprintf(path, MAX_PATH_LEN, "%s/disk/%d.disk", manager->home, i);
                ret = path_access(path);
                if (unlikely(ret)) {
                        if (disk_curr[i]) {
                                DWARN("path %s ret %d\n", path, ret);

                                ret = diskmd_remove_setlost(manager, i);
                                if (unlikely(ret))
                                        GOTO(err_ret ,ret);
                        }
                }
        }

        return 0;
err_ret:
        return ret;
}

static int diskmd_update_tier_cb(void *_pool, void *_arg)
{
        (void) _arg;
        pool_t *pool = _pool;

        disk_manager_pool_update_tier(pool);
        return 0;
}

void diskmd_update_tier()
{
        diskmd_pool_iterator(diskmd_update_tier_cb, NULL);
}

typedef struct {
        uint64_t disk_total;
        uint64_t disk_used;
        char pool_name[MAX_NAME_LEN];
} dfree_arg_t;

STATIC int diskmd_pool_dfree_cb(void *_pool, void *_arg)
{
        int ret;
        pool_t *pool = _pool;
        dfree_arg_t *arg = _arg;
        uint64_t pool_used = 0, pool_total = 0;

        if (!strcmp(arg->pool_name, pool->name)) {
                ret = diskmd_pool_dfree1(pool, &pool_total, &pool_used, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                arg->disk_total = pool_total * LICH_CHUNK_SPLIT;
                arg->disk_used = pool_used * LICH_CHUNK_SPLIT;
        }

        return 0;
err_ret:
        return ret;
}

int diskmd_pool_dfree(const char *pool_name, uint64_t *_disk_total, uint64_t *_disk_used)
{
        int ret;
        disk_manager_t *manager = __disk_manager__;

        if (!manager) {
                DWARN("disk_manager uninited\n");
                *_disk_total = 0;
                *_disk_used = 0;
                return 0;
        }

        dfree_arg_t arg;
        arg.disk_total = 0;
        arg.disk_used = 0;
        strcpy(arg.pool_name, pool_name);

        ret = diskmd_pool_iterator(diskmd_pool_dfree_cb, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_disk_used = arg.disk_used;
        *_disk_total = arg.disk_total;

        return 0;
err_ret:
        return ret;
}

int diskmd_node_dfree(nodedfree_t *dfree)
{
        int ret, i;
        pool_t *pool;
        uint64_t pool_used = 0, pool_total = 0;
        disk_manager_t *manager = __disk_manager__;

        memset(dfree, 0x0, sizeof(*dfree));

        if (manager == NULL) {
                DWARN("diskmd uninited\n");
                return 0;
        }

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < POOL_MAX; i++) {
                pool = manager->pool[i];
                if (unlikely(!pool))
                        continue;

                ret = diskmd_pool_dfree1(pool, &pool_total, &pool_used, 0);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);

                strcpy(dfree->pool_stat[dfree->pool_count].name, pool->name);
                dfree->pool_stat[dfree->pool_count].total = pool_total * LICH_CHUNK_SPLIT;
                dfree->pool_stat[dfree->pool_count].used = pool_used * LICH_CHUNK_SPLIT;
                dfree->pool_count++;
        }

        sy_rwlock_unlock(&manager->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

STATIC int diskmd_node_dfree1(uint64_t *_disk_total, uint64_t *_disk_free)
{
        int ret, i;
        pool_t *pool;
        uint64_t pool_used = 0, pool_total = 0;
        uint64_t disk_used = 0, disk_total = 0;
        disk_manager_t *manager = __disk_manager__;

        if (manager == NULL) {
                DWARN("diskmd uninited\n");
                return 0;
        }

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        for (i = 0; i < POOL_MAX; i++) {
                pool = manager->pool[i];
                if (unlikely(!pool))
                        continue;

                ret = diskmd_pool_dfree1(pool, &pool_total, &pool_used, 0);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);

                disk_used += pool_used;
                disk_total += pool_total;
        }

        sy_rwlock_unlock(&manager->lock);


        *_disk_free = disk_total - disk_used;
        *_disk_total = disk_total;

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

int diskmd_pool_writeable(const char *pool, int force)
{
        int ret;
        static uint64_t total, used, dfree;
        static time_t last_update = 0;
        time_t now;

        now = gettime();
        if (now - last_update > 5) {
                ret = diskmd_pool_dfree(pool, &total, &used);
                if (unlikely(ret)) {
                        DERROR("disk error (%d) %s\n", ret, strerror(ret));
                        SERROR(0, "%s, disk error (%d) %s\n", M_DISK_META_ERROR, ret, strerror(ret));
                        if (ret == ESTALE)
                                ret = EAGAIN;
                        EXIT(ret);
                }

                dfree = total - used;
                last_update = now;
        }

        if (force) {
                if (dfree > cdsconf.disk_keep)
                        return 1;
                else
                        return 0;
        } else {
                if (dfree > cdsconf.disk_keep / 2)
                        return 1;
                else
                        return 0;
        }
}

/*
 * inotify
 */
STATIC int diskmd_disklink_add(void *context, uint32_t mask)
{
        int ret;
        (void) context;
        (void) mask;

        DINFO("mask %u\n", mask);

        ret = diskmd_find_newdisk(__disk_manager__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_disklink_rm(void *context, uint32_t mask)
{
        int ret;
        (void) context;
        (void) mask;

        DWARN("mask %u\n", mask);

        ret = diskmd_find_removedisk(__disk_manager__);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_infolink_add(void *context, uint32_t mask)
{
        (void) context;
        (void) mask;

        DINFO("disk info created\n");
        return 0;
}

STATIC int diskmd_infolink_rm(void *context, uint32_t mask)
{
        (void) context;
        (void) mask;

        return 0;
}

int diskmd_flush()
{
        YASSERT(0 && "not implimented!");
        return 0;
}

static bmap_t *__diskmd_build_bitmap_create(int diskid)
{
        int ret;
        uint64_t size, disk_size;
        bmap_t *bmap;

        ret = disk_getsize(__disk_manager__->home, diskid, &disk_size);
        if (unlikely(ret)) {
                YASSERT(ret == ENOENT);
                disk_extinfo_t extinfo;
                ret = disk_extinfo_get(__disk_manager__->home, diskid, &extinfo);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                disk_size = extinfo.disk_size;
        }

        ret = ymalloc((void **)&bmap, sizeof(*bmap));
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        size = (disk_size / LICH_CHUNK_SPLIT);

        ret = bmap_create(bmap, size);
        if (unlikely(ret))
                UNIMPLEMENTED(__DUMP__);

        return bmap;
}
static int __diskmd_build_bitmap_itor(const chkid_t *chkid, const char *pool,
                                      const diskloc_t *loc, const chkid_t *parent,
                                      const uint64_t meta_version, void *_arg)
{
        bmap_t **array = _arg;

        YASSERT(loc->diskid < DISK_MAX);

        if (array[loc->diskid] == NULL) {
                array[loc->diskid] = __diskmd_build_bitmap_create(loc->diskid);
        }

        DINFO("set disk[%u][%u] used\n", loc->diskid, loc->idx);
        bmap_set(array[loc->diskid], loc->idx);

        return 0;
}

static void __diskmd_build_bitmap_flush(bmap_t **array)
{
        int ret, i;
        bmap_t *bmap;
        
        for (i = 0; i < DISK_MAX; i++) {
                bmap = array[i];
                
                if (bmap == NULL)
                        continue;

                ret = disk_create_bitmap_with(i, bmap->len, bmap->bits);
                if (unlikely(ret))
                        UNIMPLEMENTED(__DUMP__);

                bmap_destroy(bmap);
                yfree((void **)&bmap);
        }
}


static int __diskmd_build_bitmap()
{
        int ret;
        char path[MAX_PATH_LEN];
        struct stat stbuf;
        bmap_t *array[DISK_MAX];
        
        snprintf(path, MAX_PATH_LEN, "/dev/shm/lich4/bitmap/inited");

        ret = path_validate(path, YLIB_NOTDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = stat(path, &stbuf);
        if (ret == 0) {
                DINFO("bitmap already inited\n");
                goto out;
        }

        DINFO("rebuild bitmap from database\n");

        memset(array, 0x0, sizeof(void *) * DISK_MAX);
        disk_maping->iterator_new(__diskmd_build_bitmap_itor, array,
                                 DM_FLAG_MD | DM_FLAG_RAW);
        __diskmd_build_bitmap_flush(array);
        
        int fd = creat(path, 0644);
        if (unlikely(fd < 0)) {
                ret = errno;
                GOTO(err_ret, ret);
        }

        close(fd);

out:
        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_prep_env(const char *home)
{
        int ret;
        char path[MAX_PATH_LEN];

        ret = disk_slot_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/bitmap", home);
        ret = path_validate(path, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/disk", home);
        ret = path_validate(path, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/info", home);
        ret = path_validate(path, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/block", home);
        ret = path_validate(path, YLIB_ISDIR, YLIB_DIRCREATE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(path, MAX_PATH_LEN, "%s/chunk", ng.home);

        ret = disk_maping_init(path);
        if (unlikely(ret))
                GOTO(err_ret, ret);
        
        if (gloconf.kv_redis) {
                ret = __diskmd_build_bitmap();
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }
        
        ret = diskmd_recover_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_pool_get_nolock(disk_manager_t *manager, const char *name, pool_t **pool)
{
        int i, found = 0;
        pool_t *_pool = NULL;

        *pool = NULL;

        if (!name) {
                goto out;
        }

        for (i = 0; i < POOL_MAX; i++) {
                _pool = manager->pool[i];
                if (!_pool)
                        continue;

                if (!strcmp(_pool->name, name)) {
                        found = 1;
                        break;
                }
        }

        if (found)
                *pool = _pool;
out:
        return 0;
}

int diskmd_pool_cleanup(const char *pool_name)
{
        int i, ret;
        pool_t *pool;
        disk_manager_t *manager = __disk_manager__;

        DINFO("drop pool %s\n", pool_name);

        ANALYSIS_BEGIN(0);

        // TODO too long to delete
        ret = disk_maping->cleanup(pool_name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ANALYSIS_END(0, 1000 * 1000, "disk_cleanup");

        ANALYSIS_BEGIN(1);

        ret = sy_rwlock_wrlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = diskmd_pool_rm_nolock(manager, pool_name, &pool);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        sy_rwlock_unlock(&manager->lock);

        for (i = 0; i < DISK_MAX; i ++) {
                if (pool->disk_array[i]) {
                        ret = diskmd_recovery_stop_disk(i);
                        if (unlikely(ret)) {
                                DWARN("disk %d ret %d\n", i, ret);
                                GOTO(err_ret, ret);
                        }

                        ret = diskmd_pool_remove_disk(manager->home, pool, i);
                        if (unlikely(ret))
                                GOTO(err_ret, ret);
                }
        }

        yfree((void **)&pool);

        ANALYSIS_END(1, 1000 * 1000, "diskmd_pool_remove_disk");
        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

STATIC int diskmd_pool_add_nolock(disk_manager_t *manager, pool_t *pool)
{
        int ret, i, found = 0, added = 0;
        pool_t *pos;

        YASSERT(pool);

        for (i = 0; i < POOL_MAX; i++) {
                pos = manager->pool[i];
                if (!pos)
                        continue;

                if (!strcmp(pos->name, pool->name)) {
                        found = 1;
                        break;
                }
        }

        if (found) {
                ret = EEXIST;
                GOTO(err_ret, ret);
        }

        for (i = 0; i < POOL_MAX; i++) {
                pos = manager->pool[i];
                if (pos)
                        continue;

                added = 1;
                pool->idx = i;
                manager->pool[i] = pool;
                break;
        }

        if (!added) {
                ret = ENOSPC;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_pool_rm_nolock(disk_manager_t *manager, const char *name, pool_t **pool)
{
        int ret, i, found = 0;
        pool_t *pos;

        for (i = 0; i < POOL_MAX; i++) {
                pos = manager->pool[i];
                if (!pos)
                        continue;

                if (!strcmp(pos->name, name)) {
                        found = 1;
                        break;
                }
        }

        if (!found) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        if (pool)
                *pool = manager->pool[i];

        manager->pool[i] = NULL;

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_add_disk(disk_manager_t *manager, const char *pool_name, disk_t *disk)
{
        int ret, found = 0;
        pool_t *pool;

#if 1
        (void) found;
        UNIMPLEMENTED(__WARN__);
#else
        ret = system_pool_find(pool_name, &found);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!found) {
                ret = ENOENT;
                DWARN("pool %s don't exist, ret %d\n", pool_name, ret);
                GOTO(err_ret, ret);
        }
#endif

        ret = diskmd_pool_ensure(manager, pool_name);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                diskmd_pool_get_nolock(manager, pool_name, &pool);

                YASSERT(pool);

                ret = diskmd_pool_add_disk(pool, disk, 0);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);
        }

        sy_rwlock_unlock(&manager->lock);

        node_srv_heartbeat(1);

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

/**
 * 后续IO返回EIO错误码
 *
 * @param disk
 */
STATIC void diskmd_set_diskoffline(disk_t *disk)
{
        disk->status |= __DISK_OFFLINE__;
}

/**
 * 以下事件需要调用该过程：
 * - %.disk的fnotify
 * - load disk时，发现bad disk (lich.node --start时重建%.disk文件)
 * - disk不可写
 * - write EIO
 * - read EIO
 *
 * @see disk_manage.py
 *
 * @param disk
 * @param why
 */
STATIC int diskmd_set_disklost(disk_t *disk, const char *why)
{
        YASSERT(disk);

        if (disk->status & __DISK_OFFLINE__) {
                return 0;
        }

        DWARN("disksm set disk[%u] offline, reason %s op %p %p \n", disk->idx, why, disk, disk->dop);
        SWARN(0, "%s, set disk[%u] offline\n", M_DISK_OFFLINE_WARN, disk->idx);

        diskmd_remove(disk->idx);

        diskmd_set_diskoffline(disk);
        //diskmd_set_chunk_lost(disk);

        if (disk->dop) {
                disk->dop->close(disk);
        }

        diskmd_recover_disk(disk->idx);

        DWARN("set disk[%u] offline, reason %s op %p %p \n", disk->idx, why, disk, disk->dop);
        return 0;
}

int diskmd_set_disklost_with_lock(disk_t *disk, const char *why)
{
        int ret;

        ret = disk_slot_wrlock(disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = diskmd_set_disklost(disk, why);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        disk_slot_unlock(disk);
        return 0;
err_lock:
        disk_slot_unlock(disk);
err_ret:
        return ret;
}


STATIC int diskmd_pool_ensure(disk_manager_t *manager, const char *pool_name)
{
        int ret;
        pool_t *pool;

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        diskmd_pool_get_nolock(manager, pool_name, &pool);

        sy_rwlock_unlock(&manager->lock);

        if (!pool) {
                ret = create_pool(pool_name, &pool);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = sy_rwlock_wrlock(&manager->lock);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                ret = diskmd_pool_add_nolock(manager, pool);
                if (unlikely(ret)) {
                        if (ret != EEXIST)
                                GOTO(err_unlock, ret);
                }

                sy_rwlock_unlock(&manager->lock);
        }

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_free:
        yfree((void **)&pool);
err_ret:
        return ret;
}

typedef struct {
        disk_manager_t *manager;
        int idx;
        int retval;
        sem_t sem;
} load_arg_t;

static void *disk_load_th(void *_arg)
{
        int ret, bad;
        disk_t *disk;
        char pool_name[MAX_NAME_LEN];

        load_arg_t *arg = _arg;

        int idx = arg->idx;
        disk_manager_t *manager = arg->manager;

        ret = disk_create(idx, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 磁盘加载时无论是加载出错的还是正常的都会加到集群中，
        // 加载出错的盘会对其上的数据进行修复
        ret = disk_load(disk, manager->home, FALSE, pool_name, &bad);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if (unlikely(bad)) {
                // 1.无%.disk，有%.block
                diskmd_async_push_disk_task(disk, "bad");
        } else {
                YASSERT(((disk->size / LICH_CHUNK_SPLIT) / CHAR_BIT) == \
                                (uint64_t)disk->map_size);
        }

        ret = diskmd_add_disk(manager, pool_name, disk);
        if (unlikely(ret))
                GOTO(err_free, ret);

        arg->retval = 0;
        sem_post(&arg->sem);
        return NULL;
err_free:
        yfree((void **)&disk);
err_ret:
        arg->retval = ret;
        sem_post(&arg->sem);
        return NULL;
}

/*
 * idx.block 文件保存了和磁盘头部一样的信息, 是一个本地文件
 * 因为如果磁盘离线了就无法从磁盘头部读出磁盘属于哪个池, 所
 * 以在此备份一份, 如果文件不存在也是不可以加载磁盘的
 */
STATIC int diskmd_disk_load(disk_manager_t *manager)
{
        int ret, i, count = 0, loaded = 0;
        char path[MAX_PATH_LEN];
        load_arg_t args[DISK_MAX], *arg;

        /* 磁盘可以加载软链接或者block文件必须存在之一,因为只有这里记录着磁盘处于哪个存储池中,
         * 而bitmap也必须存在即使是坏盘(恢复用)
         */
        for (i = 0; i < DISK_MAX; i++) {
                snprintf(path, MAX_PATH_LEN, "%s/disk/%d.disk", manager->home, i);
                ret = path_access(path);
                if (unlikely(ret)) {
                        sprintf(path, "%s/block/%d.block", manager->home, i);
                        ret = path_access(path);
                        if (unlikely(ret)) {
                                continue;
                        }
                }

                arg = &args[count];
                arg->idx = i;
                arg->manager = manager;
                count ++;

                ret = sem_init(&arg->sem, 0, 0);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                ret = sy_thread_create2(disk_load_th, arg, "disk_load_th");
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        for (i = 0; i < count; i++) {
                arg = &args[i];

                ret = sem_wait(&arg->sem);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                loaded ++;
                ret = arg->retval;
                if (unlikely(ret)) {
                        DERROR("disk %d load fail!!! ret %d\n", arg->idx, ret);
                }
        }

        DINFO("disk load finish\n");
        diskmd_update_tier();

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_load(disk_manager_t *manager)
{
        int ret;

        ret = diskmd_disk_load(manager);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

STATIC int diskmd_init(const char *home, uint64_t *max_chunk)
{
        int ret;
        char path[MAX_PATH_LEN];
        disk_manager_t *manager = NULL;
        uint64_t disk_free, disk_total;

        DINFO("disk init\n");

        (void) home;
        (void) max_chunk;

        ret = diskmd_async_init();
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = ymalloc((void **)&manager, sizeof(disk_manager_t));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        snprintf(manager->home, MAX_PATH_LEN, "%s/disk", ng.home);

        ret = sy_rwlock_init(&manager->lock, "disk_manager.lock");
        if (unlikely(ret))
                GOTO(err_free, ret);

        YASSERT(__disk_manager__ == NULL);
        __disk_manager__ = manager;
        
        ret = diskmd_prep_env(manager->home);
        if (unlikely(ret))
                GOTO(err_free, ret);

        if(gloconf.nvme) {
                ret = nvme_lib_init(NVME_LOG_ERR, NVME_LOG_FILE, "/opt/fusionstack/log/nvme.log");
                if (unlikely(ret)) {
                        DERROR("nvme lib init failed\n");
                        GOTO(err_free, ret);
                }
        }

        ret = diskmd_load(manager);
        if (unlikely(ret))
                GOTO(err_free, ret);

        // 磁盘软链接
        snprintf(path, MAX_PATH_LEN, "%s/disk", manager->home);
        ret = fnotify_register(path, diskmd_disklink_add, diskmd_disklink_rm, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        // 磁盘info文件
        snprintf(path, MAX_PATH_LEN, "%s/info", manager->home);
        ret = fnotify_register(path, diskmd_infolink_add, diskmd_infolink_rm, NULL);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = diskmd_node_dfree1(&disk_total, &disk_free);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *max_chunk = disk_total * 2;

        return 0;
err_free:
        yfree((void **)&manager);
err_ret:
        return ret;
}

void diskmd_close()
{
        disk_maping->close();
}

int diskmd_online(int diskid, int *online)
{
        int ret;
        disk_t *disk;

        ret = disk_slot_get(diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                ret = disk_slot_rdlock(disk);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                *online = disk_avaiable(disk);

                disk_slot_unlock(disk);
        }

        disk_slot_release(diskid);

        return 0;
err_release:
        disk_slot_release(diskid);
err_ret:
        return ret;
}

int diskmd_set_online(int diskid, int *changed)
{
        int ret;
        disk_manager_t *manager = __disk_manager__;
        disk_t *disk;

        *changed = 0;

        ret = disk_slot_get(diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = sy_rwlock_wrlock(&disk->lock);
        if (unlikely(ret))
                GOTO(err_release, ret);

        if (disk->status & __DISK_OFFLINE__) {
                DINFO("disksm set disk %d online, disk %p op %p disk_fd %p\n",
                      diskid, disk, disk->dop, disk->disk_fd);

                YASSERT(disk->disk_fd == NULL);

                struct timeval t1, t2, t3;

                _gettimeofday(&t1, NULL);

                // load bad disk
                if (disk->dop == NULL) {
                        char pool[MAX_NAME_LEN];
                        int bad;

                        disk_close_bitmap(disk);

                        ret = disk_load(disk, manager->home, FALSE, pool, &bad);
                        if (unlikely(ret || bad))
                                GOTO(err_lock, ret);

                        if (unlikely(bad)) {
                                ret = EINVAL;
                                DERROR("disksm load bad disk %d ret %d\n", disk->idx, ret);
                                GOTO(err_lock, ret);
                        }
                }

                YASSERT(disk->dop != NULL);

                _gettimeofday(&t2, NULL);

                if (disk->disk_fd == NULL) {
                        char pool[MAX_NAME_LEN];
                        uint64_t disk_size;

                        ret = disk->dop->open(disk, manager->home, pool, &disk_size);
                        if (unlikely(ret)) {
                                DERROR("open disk %d failed\n", disk->idx);
                                GOTO(err_lock, ret);
                        }
                }

                disk->status &= ~__DISK_OFFLINE__;

                *changed = 1;

                _gettimeofday(&t3, NULL);

                int64_t used1 = _time_used(&t1, &t2);
                int64_t used2 = _time_used(&t2, &t3);

                DINFO("disksm set disk %d online, used %jd %jd\n", disk->idx, used1, used2);
        }

        disk_slot_unlock(disk);
        disk_slot_release(diskid);

        return 0;
err_lock:
        disk_slot_unlock(disk);
err_release:
        disk_slot_release(diskid);
err_ret:
        return ret;
}

int diskmd_gettier(int diskid, int *tier)
{
        int ret;
        disk_t *disk;

        ret = disk_slot_get(diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                ret = disk_slot_rdlock(disk);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                *tier = disk->tier;

                disk_slot_unlock(disk);
        }

        disk_slot_release(diskid);

        return 0;
err_release:
        disk_slot_release(diskid);
err_ret:
        return ret;
}

int diskmd_get_cache(int diskid, int *cache, int *cached)
{
        int ret;
        disk_t *disk;

        ret = disk_slot_get(diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                ret = disk_slot_rdlock(disk);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                *cache = disk->cache;
                *cached = disk->cached;

                disk_slot_unlock(disk);
        }

        disk_slot_release(diskid);

        return 0;
err_release:
        disk_slot_release(diskid);
err_ret:
        return ret;
}

typedef struct {
        int diskid;
        int valid;
        char *poolname;
} disk_to_poolname_t;


STATIC int diskid_get_poolname(void *_pool, void *_arg)
{
        pool_t *pool = _pool;
        disk_to_poolname_t *arg = _arg;

        if (arg->diskid >= 0 && arg->diskid < DISK_MAX) {
                if (pool->disk_array[arg->diskid]) {
                        strcpy(arg->poolname, pool->name);
                        arg->valid = 1;
                }
        }

        return 0;
}

int diskmd_get_pool(int diskid, char *pool_name)
{
        int ret;
        disk_manager_t *manager = __disk_manager__;

        if (!manager) {
                DWARN("disk_manager uninited\n");
                return EPERM;
        }

        disk_to_poolname_t arg;
        arg.diskid = diskid;
        arg.poolname = pool_name;
        arg.valid = 0;

        ret = diskmd_pool_iterator(diskid_get_poolname, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!arg.valid) {
                ret = ENOENT;
                GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}


static int __diskmd_set_deleting(uint32_t diskid, int deleting)
{
        int ret;
        disk_t *disk;
        uint32_t mode;

        DINFO("set disk %d deleting %d\n", diskid, deleting);
        if (deleting)
                mode = __NODE_STAT_DELETING__;
        else
                mode = 0;

        ret = disk_slot_get(diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                ret = disk_slot_rdlock(disk);
                if (unlikely(ret))
                        GOTO(err_release, ret);

                if ((disk->status & __DISK_DELETING__) == mode) {
                } else if (deleting) {
                        disk->status |= __DISK_DELETING__;
                } else {
                        disk->status &= ~__DISK_DELETING__;
                }

                disk_slot_unlock(disk);
        }

        disk_slot_release(diskid);

        return 0;
err_release:
        disk_slot_release(diskid);
err_ret:
        return ret;
}

int diskmd_set_deleting(uint32_t diskid, int deleting)
{
        int ret;
        uint32_t i;

        if (diskid == DISK_ALL_IDX) {
                for (i = 0; i < DISK_MAX; i++) {
                        ret = __diskmd_set_deleting(i, deleting);
                        if (unlikely(ret)) {
                                if (ret != ENODEV)
                                        UNIMPLEMENTED(__DUMP__);
                        }
                }
        } else {
                ret = __diskmd_set_deleting(diskid, deleting);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        }

        return 0;
err_ret:
        return ret;
}

typedef struct {
        int tier;
        uint64_t used;
        uint64_t total;
} capacity_arg_t;

int diskmd_pool_getcapacity_cb(void *_pool, void *_disk, void *_arg)
{
        (void) _pool;

        disk_t *disk = _disk;
        capacity_arg_t *arg = _arg;

        if (disk->tier != arg->tier) {
                DBUG("disk[%u] need %d got %d\n", disk->idx, arg->tier, disk->tier);
                return 0;
        }

        if (disk->cache == 100)
                return 0;

        arg->total += disk->bmap.size;
        arg->used += disk->bmap.nr_one;

        return 0;
}

int diskmd_pool_getcapacity(const char *pool_name, int tier, uint64_t *_total, uint64_t *_used)
{
        int ret;

        capacity_arg_t arg = {
                .used = 0,
                .total = 0,
                .tier = tier,
        };

        ret = diskmd_disk_iterator_by_pool(pool_name, diskmd_pool_getcapacity_cb, &arg);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        *_total = arg.total;

        if (_used)
                *_used = arg.used;

        return 0;
err_ret:
        return ret;
}

void diskmd_remove(int idx)
{
        char path[MAX_PATH_LEN];

        DWARN("remove disk[%u]\n", idx);
        SWARN(0, "%s, remove disk[%u]\n", M_DISK_OFFLINE_WARN, idx);

        YASSERT(idx < DISK_MAX);

        sprintf(path, "%s/disk/%d.disk", __disk_manager__->home, idx);

        // TODO if NVMe
        _unlink(path, "diskmd_remove");
}

int diskmd_destroy(int idx)
{
        int ret;
        disk_t *disk;

        ret = disk_slot_get(idx, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk->dop->destroy(disk);
        if (unlikely(ret))
                GOTO(err_release, ret);

        disk_slot_release(idx);

        return 0;
err_release:
        disk_slot_release(idx);
err_ret:
        return ret;
}

STATIC int diskmd_remove_disk_direct(disk_manager_t *manager, const char *pool_name, int idx)
{
        int ret;
        pool_t *pool;

        DWARN("disk %d\n", idx);

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = diskmd_pool_get_nolock(manager, pool_name, &pool);
        if (unlikely(!pool))
                GOTO(err_unlock, ret);

        ret = diskmd_pool_remove_disk(manager->home, pool, idx);
        if (unlikely(ret))
                GOTO(err_unlock, ret);

        sy_rwlock_unlock(&manager->lock);

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

typedef struct {
        struct list_head hook;
        char pool_name[MAX_NAME_LEN];
        int idx;
} disk_remove_t;

/*  单节点模式下是无法删除盘的　既恢复不了也删不掉 */
STATIC int diskmd_writeable_cb(void *_pool, void *_arg)
{
        int ret, j, avail;
        uint64_t chunk;
        disk_t *disk;
        pool_t *pool = _pool;

        // __count for every disk
        static uint64_t __count[DISK_MAX] = {0};

        struct list_head *list = _arg;
        disk_remove_t *remove_disk = NULL;

        DBUG("pool %s\n", pool->name);

        for (j = 0; j < DISK_MAX; j++) {
                if (unlikely(pool->disk_array[j])) {
                        ret = disk_slot_get(j, &disk);
                        if (unlikely(ret)) {
                                YASSERT(0 && "why ?");
                        }

                        ret = disk_slot_rdlock(disk);
                        if (unlikely(ret)) {
                                continue;
                        }

                        avail = disk_avaiable(disk);

                        disk_slot_unlock(disk);

                        if (avail) {
                                ret = disk->dop->writeable(disk);
                                if (unlikely(!ret)) {
                                        diskmd_async_push_disk_task(disk, "writeable");
                                }
                        } else {
                                // TODO 避免重复调用的开销
                                if (__count[j] % 60 == 0) {
                                        ret = disk_maping->count(disk->idx, &chunk);
                                        if (unlikely(ret))
                                                GOTO(err_ret, ret);

                                        DINFO("count %ju pool %s disk %d status %d chunk %ju\n",
                                              __count[j], pool->name, disk->idx, disk->status, chunk);

                                        // TODO chunk == 1，导致该盘不能被踢出集群，health显示有offline
                                        // disk->idx可以不连续
                                        // 手工从db里删除相关记录即可
                                        if (chunk == 0) {
                                                DINFO("disk %d recovery finish\n", j);
                                                ret = ymalloc((void **)&remove_disk, sizeof(disk_remove_t));
                                                if (unlikely(ret))
                                                        GOTO(err_ret, ret);

                                                remove_disk->idx = j;
                                                strcpy(remove_disk->pool_name, pool->name);
                                                list_add(&remove_disk->hook, list);
                                        }
                                }

                                __count[j]++;
                        }

                        disk_slot_release(j);
                }
        }

        diskmd_pool_dump(pool);
        return 0;
err_ret:
        disk_slot_release(j);
        return ret;
}

int diskmd_writeable()
{
        int ret;
        disk_remove_t *remove_disk = NULL;
        struct list_head list, *pos, *n;
        disk_manager_t *manager = __disk_manager__;

        DBUG("start\n");

        if (!__disk_manager__) {
                DWARN("diskmd uninited\n");
                return 0;
        }

        INIT_LIST_HEAD(&list);

        ret = diskmd_pool_iterator(diskmd_writeable_cb, &list);
        if (unlikely(ret)) {
                DERROR("ret %d\n", ret);
        }

        list_for_each_safe(pos, n, &list) {
                remove_disk = (disk_remove_t *) pos;

                /**
                 * @todo if diskmd_recovery is running
                 */
                ret = diskmd_remove_disk_direct(manager, remove_disk->pool_name, remove_disk->idx);
                if (unlikely(ret)) {
                        DWARN("pool %s disk %d remove fail %d\n",
                              remove_disk->pool_name, remove_disk->idx, ret);
                }

                list_del_init(pos);
                yfree((void **)&pos);
        }

        return 0;
}

int diskmd_update_latency(int diskid, uint64_t latency)
{
        (void) diskid;
        (void) latency;
        return 0;
}

void diskmd_exists(const diskloc_t *loc, int _exist)
{
        int ret;
        disk_t *disk;

        ret = disk_slot_get(loc->idx, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk_exists(disk, loc, _exist);
        if (unlikely(ret))
                GOTO(err_release, ret);

        disk_slot_release(loc->idx);

        return;
err_release:
        disk_slot_release(loc->idx);
err_ret:
        return;
}

int diskmd_delete(const diskloc_t *loc)
{
        int ret;
        disk_t *disk;

        ret = disk_slot_get(loc->diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk_delete(disk, loc);
        if (unlikely(ret))
                GOTO(err_release, ret);

        disk_slot_release(loc->diskid);

        return 0;
err_release:
        disk_slot_release(loc->diskid);
err_ret:
        return ret;
}

int diskmd_chunk_load(const chkid_t *chkid, diskloc_t *loc, chkid_t *parent,
                      char *pool)
{
        int ret;

        ANALYSIS_BEGIN(0);

        ret = disk_maping->load(chkid, loc, parent, pool);
        if (unlikely(ret))
                goto err_ret;

        YASSERT(parent->id);

#if ENABLE_BMAP_DEBUG
        diskmd_exists(loc, 1);
#endif

        ANALYSIS_QUEUE(0, IO_WARN, "diskmd_chunk_load");
        
        return 0;
err_ret:
        return ret;
}

#if DISK_MAPING_ITERATOR_NEW
#else
void diskmd_chunk_iterator_cursor(const char *table, int cursor, const char *condition, func_va_t func, void *arg)
{
	if (gloconf.kv_redis) {
                UNIMPLEMENTED(__DUMP__);
        } else {
                return disk_sqlite3_iterator_cursor(table, cursor, condition, func, arg);
        }
}
#endif

int diskmd_create_with_tier(const char *pool_name, diskloc_t *locs, int loc_count, int tier)
{
        int ret;
        pool_t *pool;
        disk_manager_t *manager = __disk_manager__;

        ANALYSIS_BEGIN(0);

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                ret = diskmd_pool_get_nolock(manager, pool_name, &pool);
                if (unlikely(!pool)) {
                        ret = ENOSPC;
                        GOTO(err_unlock, ret);
                }

                ret = diskmd_pool_get_empty_with_tier(pool, locs, loc_count, tier);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);
        }

        sy_rwlock_unlock(&manager->lock);

        ANALYSIS_QUEUE(0, IO_WARN, "diskmd_create");

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

int diskmd_create_direct(const char *pool_name, diskloc_t *locs, int locs_count, int *_tier)
{
        int ret, tier = 0;
        pool_t *pool;
        disk_manager_t *manager = __disk_manager__;

        ANALYSIS_BEGIN(0);

        ret = sy_rwlock_rdlock(&manager->lock);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        {
                ret = diskmd_pool_get_nolock(manager, pool_name, &pool);
                if (unlikely(!pool)) {
                        ret = ENOSPC;
                        GOTO(err_unlock, ret);
                }

                ret = diskmd_pool_get_empty(pool, locs, locs_count, &tier);
                if (unlikely(ret))
                        GOTO(err_unlock, ret);
        }

        sy_rwlock_unlock(&manager->lock);

        if (_tier)
                *_tier = tier;

        ANALYSIS_QUEUE(0, IO_WARN, "diskmd_create");

        return 0;
err_unlock:
        sy_rwlock_unlock(&manager->lock);
err_ret:
        return ret;
}

int diskmd_create(const char *pool, diskloc_t *locs, int loc_count, int *_tier, int priority)
{
        int ret, tier;

        ANALYSIS_BEGIN(0);

        if (priority == -1) {
                ret = diskmd_create_direct(pool, locs, loc_count, &tier);
                if (unlikely(ret))
                        GOTO(err_ret, ret);
        } else {
                YASSERT(priority == 0 || priority == 1);

                tier = priority;
                ret = diskmd_create_with_tier(pool, locs, loc_count, tier);
                if (unlikely(ret)) {
                        if (ret == ENOSPC) {
                                ret = diskmd_create_direct(pool, locs, loc_count, &tier);
                                if (unlikely(ret))
                                        GOTO(err_ret, ret);
                        } else
                                GOTO(err_ret, ret);
                }
        }

        ANALYSIS_QUEUE(0, IO_WARN, "disk_create");

        *_tier = tier;

        return 0;
err_ret:
        return ret;
}

static int __diskmd_del__(const chkid_t *chkid, uint64_t _meta_version, const diskloc_t *loc)
{
        int ret;
        uint64_t meta_version;

        if (_meta_version != (LLU)-1) {
                ret = disk_maping->getmetaversion(chkid, &meta_version);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

                DBUG("chunk "CHKID_FORMAT" meta_version %llu %llu\n",
                     CHKID_ARG(chkid), (LLU)meta_version, (LLU)_meta_version);

                if (_meta_version < meta_version) {
                        ret = EPERM;
                        GOTO(err_ret, ret);
                }
        }

        ret = disk_maping->del(chkid, loc, 1);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        return 0;
err_ret:
        return ret;
}

int diskmd_unlink(const chkid_t *chkid, const diskloc_t *loc, uint64_t meta_version)
{
        int ret;
#if ENABLE_BMAP_DEBUG
        diskmd_exists(loc, 1);
#endif

        UNIMPLEMENTED(__NULL__); //need transaction

        ret = clock_remove(chkid);
        if (unlikely(ret)) {
                if (ret == ENOKEY || ret == ENOENT) {
                        //如果没有，可能是dirty
                } else {
                        GOTO(err_ret, ret);
                }
        }

        ret = __diskmd_del__(chkid, meta_version, loc);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        DBUG("delete "CHKID_FORMAT" loc "LOC_FORMAT"\n", CHKID_ARG(chkid), LOC_ARG(loc));

        ret = diskmd_delete(loc);
        if (unlikely(ret)) {
                if (ret == ENODEV) {
                        //nothing todo
                } else {
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}

int diskmd_create_init(const chkid_t *chkid, const diskloc_t *loc, int zero, const buffer_t *initdata)
{
        int ret;
        buffer_t buf;

#if ENABLE_BMAP_DEBUG
        diskmd_exists(loc, 1);
#endif

        ANALYSIS_BEGIN(0);

        /*
         *  zero 参数是代表size(size_t),
         *  即需要清零的区域大小(元数据某些部分需要清零)
         */
        YASSERT(zero <= (int)LICH_CHUNK_SPLIT);

        if (zero) {
                mbuffer_init(&buf, 0);
                mbuffer_appendzero(&buf, zero);

                ANALYSIS_BEGIN(1);

                ret = diskmd_aio_write(chkid, loc, &buf, 0, 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                mbuffer_free(&buf);

                char *tmp, _tmp[MAX_NAME_LEN];
                tmp = _tmp;
                snprintf(tmp, MAX_NAME_LEN, "disk_write[%u]", loc->diskid);

                ANALYSIS_END(1, IO_WARN, tmp);
        }

        if (initdata) {
                DBUG("init disk %u offset %llu\n", loc->diskid, (LLU)loc->idx);
                ANALYSIS_BEGIN(0);

                ret = diskmd_aio_write(chkid, loc, initdata, 0, 0);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                char *tmp1, _tmp1[MAX_NAME_LEN];
                tmp1 = _tmp1;
                snprintf(tmp1, MAX_NAME_LEN, "disk_write[%u]",  loc->diskid);

                ANALYSIS_END(0, IO_WARN, tmp1);
        }

        ANALYSIS_QUEUE(0, IO_WARN, "disk_create_init");

        // disk_getfd return sync fd, so don't need sync
        // disk_aio_sync(fd, 0);

        return 0;
err_free:
        mbuffer_free(&buf);
        return ret;
}

/*
 * IO
 */

static void __diskmd_close(disk_t *disk)
{
        if (disk->private) {
                disk_slot_private_disconnect(disk->idx);
        } else {
                diskmd_async_push_disk_task(disk, "aio_close");
        }
}

static int __diskmd_slot_connect(const chkid_t *chkid, const diskloc_t *loc, disk_t **disk)
{
        if (likely(chkid->type == __RAW_CHUNK__)) {
                return disk_slot_private_connect(loc->diskid, disk);
        } else {
                return disk_slot_get(loc->diskid, disk);
        }
}

static void __diskmd_slot_release(const disk_t *disk)
{
        if (likely(disk->private)) {
                disk_slot_private_release(disk->idx);
        } else {
                disk_slot_release(disk->idx);
        }
}

#if ENABLE_ALIGN_NEW

static int IO_FUNC __diskmd_aio_read(disk_t *disk, const chkid_t *chkid,
                                     const diskloc_t *loc, buffer_t *buf,
                                     int _offset, int prio)
{
        int ret;
        uint64_t offset = (uint64_t)loc->idx * LICH_CHUNK_SPLIT;
        uint64_t real_offset = offset + _offset;

        DISKMD_ANALYSIS_BEGIN(0);

        YASSERT(chkid);

        ret = disk->dop->aio_readv(disk, chkid, buf, real_offset, prio);
        if (unlikely(ret)) {
                DERROR("disk[%u] ret %d %s\n", loc->diskid, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        DISKMD_ANALYSIS_UPDATE(0, loc, IO_WARN, "disk_aio_read");

        return 0;
err_ret:
        return ret;
}

#else

static int IO_FUNC __diskmd_aio_read(disk_t *disk, const chkid_t *chkid,
                                     const diskloc_t *loc, buffer_t *buf,
                                     int _offset, int prio)
{
        int ret, trans = 0;
        seg_t *seg;
        uint64_t newoff;
        buffer_t *newbuf, __tmp;
        char tmp[PAGE_SIZE];

        //_offset 为chunk内偏移
        uint64_t offset = (uint64_t)loc->idx * LICH_CHUNK_SPLIT;
        uint64_t real_offset = offset + _offset;

        //diskmd_disk_write_check(loc->diskid);

        DISKMD_ANALYSIS_BEGIN(0);

        YASSERT(chkid);
        int align = 0;
        align = (chkid->type != __RAW_CHUNK__
                 || (chkid->type == __RAW_CHUNK__
                     && gloconf.read_modify_write));

        if (unlikely(align && !is_aligned(buf, real_offset))) {
                trans = 1;

                newbuf = &__tmp;
                ret = buffer_align_trans(buf, real_offset, newbuf, &newoff);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

#if ENABLE_CHUNK_DEBUG
                DINFO("aio align read "CHKID_FORMAT" disk %u idx %u "
                                "raw (size %u chunk offset %d disk offset %ju) "
                                "align (size %u disk offset %ju)\n",
                                CHKID_ARG(chkid), loc->diskid, loc->idx,
                                buf->len, _offset, real_offset,
                                newbuf->len, newoff
                    );
#else
                DBUG("aio align read "CHKID_FORMAT" disk %u idx %u "
                                "raw (size %u chunk offset %d disk offset %ju) "
                                "align (size %u disk offset %ju)\n",
                                CHKID_ARG(chkid), loc->diskid, loc->idx,
                                buf->len, _offset, real_offset,
                                newbuf->len, newoff
                    );

#endif
        } else {
                newbuf = buf;
                newoff = real_offset;
        }

        YASSERT(real_offset >= newoff);

        ret = disk->dop->aio_readv(disk, chkid, newbuf, newoff, prio);
        if (unlikely(ret)) {
                DERROR("disk[%u] ret %d %s\n", loc->diskid, ret, strerror(ret));
                GOTO(err_free, ret);
        }

        if (unlikely(trans)) {
                YASSERT(real_offset - newoff < PAGE_SIZE);
                ret = mbuffer_popmsg(newbuf, tmp, real_offset - newoff);
                if (unlikely(ret))
                        YASSERT(0 && "why?");

                seg = (seg_t *)buf->list.next;
                YASSERT(buf->len == seg->len);
                ret = mbuffer_popmsg(newbuf, seg->handler.ptr, buf->len);
                if (unlikely(ret))
                        YASSERT(0 && "why?");

                DBUG("mbuffer_free len %u\n", newbuf->len);
                mbuffer_free(newbuf);
        }

        DISKMD_ANALYSIS_UPDATE(0, loc, IO_WARN, "disk_aio_read");

        return 0;
err_free:
        mbuffer_free(newbuf);
err_ret:
        return ret;
}

#endif

int IO_FUNC diskmd_aio_read(const chkid_t *chkid, const diskloc_t *loc, buffer_t *buf, int _offset, int prio)
{
        int ret;
        disk_t *disk;

        YASSERT(chkid);
        ret = __diskmd_slot_connect(chkid, loc, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk_slot_rdlock(disk);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __diskmd_aio_read(disk, chkid, loc, buf, _offset, prio);
        if (unlikely(ret))
                GOTO(err_lock, ret);
        
        disk_slot_unlock(disk);
        __diskmd_slot_release(disk);

        return 0;
err_lock:
        disk_slot_unlock(disk);
        if (ret == EIO) {
                __diskmd_close(disk);
        }
err_release:
        __diskmd_slot_release(disk);
err_ret:
        return ret;
}

#if ENABLE_ALIGN_NEW

static int IO_FUNC  __diskmd_aio_write(disk_t *disk, const chkid_t *chkid,
                                       const diskloc_t *loc, const buffer_t *buf,
                                       int _offset, int prio)
{
        int ret;

        //_offset 为chunk内偏移
        uint64_t offset = (uint64_t)loc->idx * LICH_CHUNK_SPLIT;
        uint64_t real_offset = offset + _offset;

        DISKMD_ANALYSIS_BEGIN(0);

        YASSERT(chkid);

        ret = disk->dop->aio_writev(disk, chkid, buf, real_offset, prio);
        if (unlikely(ret)) {
                DERROR("disk[%u] ret %d %s\n", loc->diskid, ret, strerror(ret));
                GOTO(err_ret, ret);
        }

        DISKMD_ANALYSIS_UPDATE(0, loc, IO_WARN, "disk_aio_write");

        return 0;
err_ret:
        return ret;
}

#else

static int IO_FUNC  __diskmd_aio_write(disk_t *disk, const chkid_t *chkid,
                                       const diskloc_t *loc, const buffer_t *buf,
                                       int _offset, int prio)
{
        int ret, trans = 0;
        uint64_t newoff;
        seg_t *seg;
        buffer_t *newbuf = NULL, __tmp;

        //_offset 为chunk内偏移
        uint64_t offset = (uint64_t)loc->idx * LICH_CHUNK_SPLIT;
        uint64_t real_offset = offset + _offset;

        DISKMD_ANALYSIS_BEGIN(0);

        YASSERT(chkid);

        int align = 0;
        align = (chkid->type != __RAW_CHUNK__
                 || (chkid->type == __RAW_CHUNK__
                     && gloconf.read_modify_write));

        if (unlikely(align && !is_aligned(buf, real_offset))) {
                trans = 1;

                newbuf = &__tmp;
                ret = buffer_align_trans(buf, real_offset, newbuf, &newoff);
                if (unlikely(ret))
                        GOTO(err_ret, ret);

#if ENABLE_CHUNK_DEBUG
                DINFO("aio align write "CHKID_FORMAT" disk %u idx %u "
                                "raw (size %u chunk offset %d disk offset %ju) "
                                "align (size %u disk offset %ju)\n",
                                CHKID_ARG(chkid), loc->diskid, loc->idx,
                                buf->len, _offset, real_offset,
                                newbuf->len, newoff
                    );
#else
                DBUG("aio align write "CHKID_FORMAT" disk %u idx %u "
                                "raw (size %u chunk offset %d disk offset %ju) "
                                "align (size %u disk offset %ju)\n",
                                CHKID_ARG(chkid), loc->diskid, loc->idx,
                                buf->len, _offset, real_offset,
                                newbuf->len, newoff
                    );

#endif
                ret = disk->dop->aio_readv(disk, chkid, newbuf, newoff, 1);
                if (unlikely(ret))
                        GOTO(err_free, ret);

                seg = (seg_t *)newbuf->list.next;
                YASSERT(buf->len <= (seg->len - (real_offset - newoff)));
                mbuffer_get(buf, seg->handler.ptr + (real_offset - newoff), buf->len);
        } else {
                newbuf = (buffer_t *)buf;
                newoff = real_offset;
        }

        YASSERT(real_offset >= newoff);

        ret = disk->dop->aio_writev(disk, chkid, newbuf, newoff, prio);
        if (unlikely(ret)) {
                DERROR("disk[%u] ret %d %s\n", loc->diskid, ret, strerror(ret));
                GOTO(err_free, ret);
        }

        if (unlikely(trans)) {
                mbuffer_free(newbuf);
        }

        DISKMD_ANALYSIS_UPDATE(0, loc, IO_WARN, "disk_aio_write");

        return 0;
err_free:
        if (unlikely(trans)) {
                mbuffer_free(newbuf);
        }
err_ret:
        return ret;
}

#endif

int IO_FUNC diskmd_aio_write(const chkid_t *chkid, const diskloc_t *loc, const buffer_t *buf, int _offset, int prio)
{
        int ret;
        disk_t *disk;

        YASSERT(chkid);
        ret = __diskmd_slot_connect(chkid, loc, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk_slot_rdlock(disk);
        if (unlikely(ret))
                GOTO(err_release, ret);

        ret = __diskmd_aio_write(disk, chkid, loc, buf, _offset, prio);
        if (unlikely(ret))
                GOTO(err_lock, ret);

        disk_slot_unlock(disk);
        __diskmd_slot_release(disk);

        return 0;
err_lock:
        disk_slot_unlock(disk);
        if (ret == EIO) {
                __diskmd_close(disk);
        }
err_release:
        __diskmd_slot_release(disk);
err_ret:
        return ret;
}

int diskmd_disk_write_check(int diskid)
{
        int ret;
        disk_t *disk;
        char str[PAGE_SIZE] = {0};
        char tmp[PAGE_SIZE] = {0};

        ret = disk_slot_get(diskid, &disk);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        if (!disk_avaiable(disk)) {
                ret = EIO;
                GOTO(err_ret, ret);
        }

        buffer_t buf1, buf2;

        ret = mbuffer_init(&buf1, 0);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = mbuffer_init(&buf2, PAGE_SIZE);
        if (unlikely(ret))
                GOTO(err_ret, ret);

        memset(str, 1, sizeof(str));

        ret = mbuffer_appendmem(&buf1, str, sizeof(str));
        if (unlikely(ret))
                GOTO(err_ret, ret);

        ret = disk->dop->aio_writev(disk, NULL, &buf1, 0, 1);
        if (ret)
                YASSERT(0);

        ret = disk->dop->aio_readv(disk, NULL, &buf2, 0, 1);
        if (ret)
                YASSERT(0);

        ret = mbuffer_popmsg(&buf2, tmp, buf2.len);
        if (ret)
                YASSERT(0);

        YASSERT(0 == memcmp(str, tmp, PAGE_SIZE));

        disk_slot_release(diskid);

        mbuffer_free(&buf1);
        mbuffer_free(&buf2);
        return 0;
err_ret:
        return ret;
}

int diskmd_aio_fsync(int fd, int prio)
{
        int ret;
        struct iocb iocb;
        task_t task;

        /*can not use, io_sbumit return 22 EINVEL, don't know why */
        io_prep_fsync(&iocb, fd);

        task = schedule_task_get();
        iocb.data = &task;

        DBUG("aio yield, task %u\n", task.taskid);
        ret = aio_commit(&iocb, prio, 1);
        if (unlikely(ret)) {
                DWARN("task[%u] retval %u\n", task.taskid, ret);
                GOTO(err_ret, ret);
        }

        DBUG("aio resume, task %u\n", task.taskid);

        return 0;
err_ret:
        return ret;
}

int diskmd_real_path(char *path, struct stat* stbuf)
{
        int ret;
        char buf[MAX_PATH_LEN];
        char *tmp;

        ret = lstat(path, stbuf);
        if (ret < 0) {
                ret = errno;
                goto err_ret;
        }

        while (S_ISLNK(stbuf->st_mode)) {
                memset(buf, 0, sizeof(buf));

                ret = readlink(path, buf, sizeof(buf));
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }

                if (buf[0] != '/') {
                        tmp = strrchr(path, '/');
                        strcpy(tmp + 1, buf);
                        strcpy(buf, path);
                }
                tmp = realpath(buf, path);
                (void) tmp;

                ret = lstat(path, stbuf);
                if (ret < 0) {
                        ret = errno;
                        GOTO(err_ret, ret);
                }
        }

        return 0;
err_ret:
        return ret;
}
